/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.cluster.sdv.generated


import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, InputStream}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.avro
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, Encoder}
import org.apache.commons.lang3.RandomStringUtils
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.common.util.QueryTest
import org.junit.Assert
import org.scalatest.BeforeAndAfterEach

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}

/**
 * Test Class for SDKwriterTestcase to verify all scenarios
 */
class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {

  var writerPath =
    s"${ resourcesPath }" + "/SparkCarbonFileFormat/WriterOutput1/"
  // scalastyle:off lineLength
  override def beforeEach: Unit = {
    sql("DROP TABLE IF EXISTS sdkTable1")
    sql("DROP TABLE IF EXISTS sdkTable2")
    sql("DROP TABLE IF EXISTS table1")
    cleanTestData()
  }

  override def afterEach(): Unit = {
    sql("DROP TABLE IF EXISTS sdkTable1")
    sql("DROP TABLE IF EXISTS sdkTable2")
    sql("DROP TABLE IF EXISTS table1")
    cleanTestData()
  }

  def cleanTestData(): Boolean = {
    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
  }

  def buildTestDataSingleFile(): Any = {
    buildTestData(3, null)
  }

  def buildTestDataWithBadRecordForce(writerPath: String): Any = {
    var options = Map("bAd_RECords_action" -> "FORCE").asJava
    buildTestData(3, options)
  }

  def buildTestDataWithBadRecordFail(writerPath: String): Any = {
    var options = Map("bAd_RECords_action" -> "FAIL").asJava
    buildTestData(15001, options)
  }

  def buildTestData(rows: Int, options: util.Map[String, String]): Any = {
    buildTestData(rows, options, List("name"), writerPath)
  }

  // prepare sdk writer output
  def buildTestData(rows: Int, options: util.Map[String, String], sortColumns: List[String], writerPath: String): Any = {
    val schema = new StringBuilder()
      .append("[ \n")
      .append("   {\"name\":\"string\"},\n")
      .append("   {\"age\":\"int\"},\n")
      .append("   {\"height\":\"double\"}\n")
      .append("]")
      .toString()

    try {
      val builder = CarbonWriter.builder()
      val writer =
        if (options != null) {
          builder.outputPath(writerPath)
            .sortBy(sortColumns.toArray)
            .uniqueIdentifier(
              System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
            .withCsvInput(Schema.parseJson(schema)).build()
        } else {
          builder.outputPath(writerPath)
            .sortBy(sortColumns.toArray)
            .uniqueIdentifier(
              System.currentTimeMillis).withBlockSize(2)
            .withCsvInput(Schema.parseJson(schema)).build()
        }
      var i = 0
      while (i < rows) {
        if ((options != null) && (i < 3)) {
          // writing a bad record
          writer.write(Array[String]("abc" + i, String.valueOf(i.toDouble / 2), "abc"))
        } else {
          writer.write(Array[String]("abc" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
        }
        i += 1
      }
      if (options != null) {
        // Keep one valid record. else carbon data file will not generate
        writer.write(Array[String]("abc" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
      }
      writer.close()
    } catch {
      case ex: Exception => throw new RuntimeException(ex)

      case _ => None
    }
  }

  def buildTestDataWithBadRecordIgnore(writerPath: String): Any = {
    var options = Map("bAd_RECords_action" -> "IGNORE").asJava
    buildTestData(3, options)
  }

  def buildTestDataWithBadRecordRedirect(writerPath: String): Any = {
    var options = Map("bAd_RECords_action" -> "REDIRECT").asJava
    buildTestData(3, options)
  }

  def deleteFile(path: String, extension: String): Unit = {
    val file: CarbonFile = FileFactory.getCarbonFile(path)

    for (eachDir <- file.listFiles) {
      if (!eachDir.isDirectory) {
        if (eachDir.getName.endsWith(extension)) {
          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
        }
      } else {
        deleteFile(eachDir.getPath, extension)
      }
    }
  }

  test("test create External Table with WriterPath") {
    buildTestDataSingleFile()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable
         | STORED AS carbondata
         | LOCATION '$writerPath' """.stripMargin)

    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
      Row("abc1", 1, 0.5),
      Row("abc2", 2, 1.0)))
  }

  test("test create External Table with Comment") {
    buildTestDataSingleFile()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable comment 'this is comment'
         |STORED AS carbondata
         |LOCATION '$writerPath' """.stripMargin)

    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
      Row("abc1", 1, 0.5),
      Row("abc2", 2, 1.0)))
  }

  test("test create External Table and test files written from sdk writer") {
    buildTestDataSingleFile()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")

    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata
         |LOCATION
         |'$writerPath' """.stripMargin)
    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
      Row("abc1", 1, 0.5),
      Row("abc2", 2, 1.0)))

    checkAnswer(sql("select name from sdkTable"), Seq(Row("abc0"),
      Row("abc1"),
      Row("abc2")))

    checkAnswer(sql("select age from sdkTable"), Seq(Row(0), Row(1), Row(2)))
    checkAnswer(sql("select * from sdkTable where age > 1 and age < 8"),
      Seq(Row("abc2", 2, 1.0)))

    checkAnswer(sql("select * from sdkTable where name = 'abc2'"),
      Seq(Row("abc2", 2, 1.0)))

    checkAnswer(sql("select * from sdkTable where name like '%b%' limit 2"),
      Seq(Row("abc0", 0, 0.0),
        Row("abc1", 1, 0.5)))

    checkAnswer(sql("select sum(age) from sdkTable where name like 'abc%'"), Seq(Row(3)))
    checkAnswer(sql("select count(*) from sdkTable where name like 'abc%' "), Seq(Row(3)))
    checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(3)))

  }

  test("test create External Table and test insert into external table") {
    buildTestDataSingleFile()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")

    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata
         |LOCATION
         |'$writerPath' """.stripMargin)

    checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""),
      Seq(Row(1)))

    sql("insert into sdktable select 'def0',1,5.5")
    sql("insert into sdktable select 'def1',5,6.6")

    checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""),
      Seq(Row(2)))
  }

  test("test create External Table and test insert into normal table with different schema") {
    buildTestDataSingleFile()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql("DROP TABLE IF EXISTS table1")

    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata
         |LOCATION
         |'$writerPath' """.stripMargin)
    sql(
      "create table if not exists table1 (name string, age int) STORED AS carbondata")
    sql("insert into table1 select * from sdkTable")
    checkAnswer(sql("select * from table1"), Seq(Row("abc0", 0),
      Row("abc1", 1),
      Row("abc2", 2)))
  }

  test("test Insert into External Table from another External Table with Same Schema") {
    buildTestDataSingleFile()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable1")
    sql("DROP TABLE IF EXISTS sdkTable2")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable1 STORED AS carbondata
         |LOCATION
         |'$writerPath' """.stripMargin)
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable2 STORED AS carbondata
         |LOCATION
         |'$writerPath' """.stripMargin)

    sql("insert into sdkTable1 select *from sdkTable2")
    checkAnswer(sql("select count(*) from sdkTable1"), Seq(Row(6)))
  }

  test("test create External Table without Schema") {
    buildTestDataSingleFile()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")

    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata
         |LOCATION
         |'$writerPath' """.stripMargin)

    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
      Row("abc1", 1, 0.5),
      Row("abc2", 2, 1.0)))
  }

  test("test External Table with insert overwrite") {
    buildTestDataSingleFile()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql("DROP TABLE IF EXISTS table1")

    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata
         |LOCATION
         |'$writerPath' """.stripMargin)

    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
      Row("abc1", 1, 0.5),
      Row("abc2", 2, 1.0)))

    sql(
      "create table if not exists table1 (name string, age int, height double) STORED AS carbondata")
    sql(s"""insert into table1 values ("aaaaa", 12, 20)""")

    checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""),
      Seq(Row(1)))

    sql("insert overwrite table sdkTable select * from table1")

    checkAnswer(sql(s"""select count(*) from sdkTable where age = 1"""),
      Seq(Row(0)))
  }

  test("test create External Table with Table properties should fail") {
    buildTestDataSingleFile()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")

    val ex = intercept[AnalysisException] {
      sql(
        s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata
           |LOCATION
           |'$writerPath' TBLPROPERTIES('sort_scope'='local_sort') """.stripMargin)
    }
    assert(ex.message.contains("Table properties are not supported for external table"))
  }

  test("Read sdk writer output file and test without carbondata and carbonindex files should fail")
  {
    buildTestDataSingleFile()
    deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
    deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")

    val exception = intercept[Exception] {
      // data source file format
      sql(
        s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata LOCATION
           |'$writerPath' """.stripMargin)
    }
    assert(exception.getMessage()
      .contains("Operation not allowed: Invalid table path provided:"))
  }

  test("test create External Table and test CTAS") {
    buildTestDataSingleFile()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql("DROP TABLE IF EXISTS table1")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata
         |LOCATION
         |'$writerPath' """.stripMargin)

    checkAnswer(sql("select * from sdkTable"), Seq(Row("abc0", 0, 0.0),
      Row("abc1", 1, 0.5),
      Row("abc2", 2, 1.0)))

    sql("create table table1 STORED AS carbondata as select *from sdkTable")

    checkAnswer(sql("select * from table1"), Seq(Row("abc0", 0, 0.0),
      Row("abc1", 1, 0.5),
      Row("abc2", 2, 1.0)))
  }

  test("test create External Table and test JOIN on External Tables") {
    buildTestDataSingleFile()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql("DROP TABLE IF EXISTS sdkTable1")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata
         |LOCATION
         |'$writerPath' """.stripMargin)
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable1 STORED AS carbondata
         |LOCATION
         |'$writerPath' """.stripMargin)

    checkAnswer(sql("select * from sdkTable JOIN sdkTable1 on (sdkTable.age=sdkTable1.age)"),
      Seq(Row("abc0", 0, 0.0, "abc0", 0, 0.0),
        Row("abc1", 1, 0.5, "abc1", 1, 0.5),
        Row("abc2", 2, 1.0, "abc2", 2, 1.0)))
    checkAnswer(sql(
      "select * from sdkTable LEFT OUTER JOIN sdkTable1 on (sdkTable.age=sdkTable1.age)"),
      Seq(Row("abc0", 0, 0.0, "abc0", 0, 0.0),
        Row("abc1", 1, 0.5, "abc1", 1, 0.5),
        Row("abc2", 2, 1.0, "abc2", 2, 1.0)))
    checkAnswer(sql(
      "select * from sdkTable RIGHT OUTER JOIN sdkTable1 on (sdkTable.age=sdkTable1.age)"),
      Seq(Row("abc0", 0, 0.0, "abc0", 0, 0.0),
        Row("abc1", 1, 0.5, "abc1", 1, 0.5),
        Row("abc2", 2, 1.0, "abc2", 2, 1.0)))
  }

  test("test create external table and test bad record") {
    // 1. Action = FORCE
    buildTestDataWithBadRecordForce(writerPath)
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata LOCATION
         |'$writerPath' """.stripMargin)
    checkAnswer(sql("select * from sdkTable"), Seq(
      Row("abc0", null, null),
      Row("abc1", null, null),
      Row("abc2", null, null),
      Row("abc3", 3, 1.5)))

    sql("DROP TABLE sdkTable")
    cleanTestData()

    // 2. Action = REDIRECT
    buildTestDataWithBadRecordRedirect(writerPath)
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata LOCATION
         |'$writerPath' """.stripMargin)

    checkAnswer(sql("select * from sdkTable"), Seq(
      Row("abc3", 3, 1.5)))

    sql("DROP TABLE sdkTable")
    cleanTestData()

    // 3. Action = IGNORE
    buildTestDataWithBadRecordIgnore(writerPath)
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata LOCATION
         |'$writerPath' """.stripMargin)
    checkAnswer(sql("select * from sdkTable"), Seq(
      Row("abc3", 3, 1.5)))

  }

  def buildAvroTestDataStructType(): Any = {
    buildAvroTestDataStruct(3, null)
  }

  def buildAvroTestDataStruct(rows: Int,
      options: util.Map[String, String]): Any = {

    val mySchema =
      """
        |{"name": "address",
        | "type": "record",
        | "fields": [
        |  { "name": "name", "type": "string"},
        |  { "name": "age", "type": "int"},
        |  { "name": "address",  "type": {
        |    "type" : "record",  "name" : "my_address",
        |        "fields" : [
        |    {"name": "street", "type": "string"},
        |    {"name": "city", "type": "string"}]}}
        |]}
      """.stripMargin

    val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """
    WriteFilesWithAvroWriter(rows, mySchema, json)
  }

  def buildAvroTestDataBothStructArrayType(): Any = {
    buildAvroTestDataStructWithArrayType(3, null)
  }

  def buildAvroTestDataStructWithArrayType(rows: Int,
      options: util.Map[String, String]): Any = {

    val mySchema =
      """
                     {
                     |     "name": "address",
                     |     "type": "record",
                     |     "fields": [
                     |     { "name": "name", "type": "string"},
                     |     { "name": "age", "type": "int"},
                     |     {
                     |     "name": "address",
                     |     "type": {
                     |     "type" : "record",
                     |     "name" : "my_address",
                     |     "fields" : [
                     |     {"name": "street", "type": "string"},
                     |     {"name": "city", "type": "string"}
                     |     ]}
                     |     },
                     |     {"name" :"doorNum",
                     |     "type" : {
                     |     "type" :"array",
                     |     "items":{
                     |     "name" :"EachdoorNums",
                     |     "type" : "int",
                     |     "default":-1
                     |     }}
                     |     }]}
                     """.stripMargin

    val json =
      """ {"name":"bob", "age":10,
        |"address" : {"street":"abc", "city":"bang"},
        |"doorNum" : [1,2,3,4]}""".stripMargin
    WriteFilesWithAvroWriter(rows, mySchema, json)
  }

  private def WriteFilesWithAvroWriter(rows: Int,
      mySchema: String,
      json: String): Unit = {
    // conversion to GenericData.Record
    val nn = new avro.Schema.Parser().parse(mySchema)
    val record = avroUtil.jsonToAvro(json, mySchema)

    try {
      val writer = CarbonWriter.builder
        .outputPath(writerPath)
        .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build()
      var i = 0
      while (i < rows) {
        writer.write(record)
        i = i + 1
      }
      writer.close()
    }
    catch {
      case e: Exception =>
        e.printStackTrace()
        Assert.fail(e.getMessage)
    }
  }

  def buildAvroTestDataArrayOfStructType(): Any = {
    buildAvroTestDataArrayOfStruct(3, null)
  }

  def buildAvroTestDataArrayOfStruct(rows: Int,
      options: util.Map[String, String]): Any = {

    val mySchema =
      """ {
        | "name": "address",
        | "type": "record",
        | "fields": [
        |  {
        |   "name": "name",
        |   "type": "string"
        |  },
        |  {
        |   "name": "age",
        |   "type": "int"
        |  },
        |  {
        |   "name": "doorNum",
        |   "type": {
        |    "type": "array",
        |    "items": {
        |     "type": "record",
        |     "name": "my_address",
        |     "fields": [
        |      {
        |       "name": "street",
        |       "type": "string"
        |      },
        |      {
        |       "name": "city",
        |       "type": "string"
        |      }
        |     ]
        |    }
        |   }
        |  }
        | ]
        |} """.stripMargin
    val json =
      """ {"name":"bob","age":10,"doorNum" :
        |[{"street":"abc","city":"city1"},
        |{"street":"def","city":"city2"},
        |{"street":"ghi","city":"city3"},
        |{"street":"jkl","city":"city4"}]} """.stripMargin
    WriteFilesWithAvroWriter(rows, mySchema, json)
  }

  def buildAvroTestDataStructOfArrayType(): Any = {
    buildAvroTestDataStructOfArray(3, null)
  }

  def buildAvroTestDataStructOfArray(rows: Int,
      options: util.Map[String, String]): Any = {

    val mySchema =
      """ {
        | "name": "address",
        | "type": "record",
        | "fields": [
        |  {
        |   "name": "name",
        |   "type": "string"
        |  },
        |  {
        |   "name": "age",
        |   "type": "int"
        |  },
        |  {
        |   "name": "address",
        |   "type": {
        |    "type": "record",
        |    "name": "my_address",
        |    "fields": [
        |     {
        |      "name": "street",
        |      "type": "string"
        |     },
        |     {
        |      "name": "city",
        |      "type": "string"
        |     },
        |     {
        |      "name": "doorNum",
        |      "type": {
        |       "type": "array",
        |       "items": {
        |        "name": "EachdoorNums",
        |        "type": "int",
        |        "default": -1
        |       }
        |      }
        |     }
        |    ]
        |   }
        |  }
        | ]
        |} """.stripMargin

    val json =
      """ {
        | "name": "bob",
        | "age": 10,
        | "address": {
        |  "street": "abc",
        |  "city": "bang",
        |  "doorNum": [
        |   1,
        |   2,
        |   3,
        |   4
        |  ]
        | }
        |} """.stripMargin
    WriteFilesWithAvroWriter(rows, mySchema, json)
  }

  test("Read sdk writer Avro output Record Type for nontransactional table") {
    buildAvroTestDataStructType()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata LOCATION
         |'$writerPath' """.stripMargin)

    checkAnswer(sql("select * from sdkTable"), Seq(
      Row("bob", 10, Row("abc", "bang")),
      Row("bob", 10, Row("abc", "bang")),
      Row("bob", 10, Row("abc", "bang"))))

  }

  test("Read sdk writer Avro output with both Array and Struct Type for nontransactional table") {
    buildAvroTestDataBothStructArrayType()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata LOCATION
         |'$writerPath' """.stripMargin)
    checkAnswer(sql("select * from sdkTable"), Seq(
      Row("bob", 10, Row("abc", "bang"), mutable.WrappedArray.newBuilder[Int].+=(1, 2, 3, 4)),
      Row("bob", 10, Row("abc", "bang"), mutable.WrappedArray.newBuilder[Int].+=(1, 2, 3, 4)),
      Row("bob", 10, Row("abc", "bang"), mutable.WrappedArray.newBuilder[Int].+=(1, 2, 3, 4))))
  }

  test("Read sdk writer Avro output with Array of struct for external table") {
    buildAvroTestDataArrayOfStructType()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata LOCATION
         |'$writerPath' """.stripMargin)

    checkAnswer(sql(s"""select count(*) from sdkTable"""),
      Seq(Row(3)))
  }

  test("Read sdk writer Avro output with struct of Array for nontransactional table") {
    buildAvroTestDataStructOfArrayType()
    assert(FileFactory.getCarbonFile(writerPath).exists())
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(
      s"""CREATE EXTERNAL TABLE sdkTable STORED AS carbondata LOCATION
         |'$writerPath' """.stripMargin)

    checkAnswer(sql(s"""select count(*) from sdkTable"""),
      Seq(Row(3)))
  }

  test("Test sdk with longstring") {
    // here we specify the longstring column as varchar
    val schema = new StringBuilder()
      .append("[ \n")
      .append("   {\"name\":\"string\"},\n")
      .append("   {\"address\":\"varchar\"},\n")
      .append("   {\"age\":\"int\"}\n")
      .append("]")
      .toString()
    val builder = CarbonWriter.builder()
    val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()

    for (i <- 0 until 5) {
      writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(33000), i.toString))
    }
    writer.close()

    assert(FileFactory.getCarbonFile(writerPath).exists)
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(s"CREATE EXTERNAL TABLE sdkTable STORED AS carbondata LOCATION '$writerPath'")
    checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(5)))
  }

  test("Test sdk with longstring with more than 2MB length") {
    // here we specify the longstring column as varchar
    val schema = new StringBuilder()
      .append("[ \n")
      .append("   {\"name\":\"string\"},\n")
      .append("   {\"address\":\"varchar\"},\n")
      .append("   {\"age\":\"int\"}\n")
      .append("]")
      .toString()
    val builder = CarbonWriter.builder()
    val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
    val varCharLen = 4000000
    for (i <- 0 until 3) {
      writer
        .write(Array[String](s"name_$i",
          RandomStringUtils.randomAlphabetic(varCharLen),
          i.toString))
    }
    writer.close()

    assert(FileFactory.getCarbonFile(writerPath).exists)
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(s"CREATE EXTERNAL TABLE sdkTable STORED AS carbondata LOCATION '$writerPath'")
    checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(3)))

    val op = sql("select address from sdkTable limit 1").collectAsList()
    assert(op.get(0).getString(0).length == varCharLen)
  }

  test("Test sdk with longstring with empty sort column and some direct dictionary columns") {
    // here we specify the longstring column as varchar
    val schema = new StringBuilder()
      .append("[ \n")
      .append("   {\"address\":\"varchar\"},\n")
      .append("   {\"date1\":\"date\"},\n")
      .append("   {\"date2\":\"date\"},\n")
      .append("   {\"age\":\"int\"}\n")
      .append("]")
      .toString()
    val builder = CarbonWriter.builder()
    val writer = builder
      .outputPath(writerPath)
      .sortBy(Array[String]())
      .withCsvInput(Schema.parseJson(schema)).build()

    for (i <- 0 until 5) {
      writer
        .write(Array[String](RandomStringUtils.randomAlphabetic(40000),
          "1999-12-01",
          "1998-12-01",
          i.toString))
    }
    writer.close()

    assert(FileFactory.getCarbonFile(writerPath).exists)
    sql("DROP TABLE IF EXISTS sdkTable")
    sql(s"CREATE EXTERNAL TABLE sdkTable STORED AS carbondata LOCATION '$writerPath'")
    checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(5)))
  }
}

object avroUtil{
  def jsonToAvro(json: String, avroSchema: String): GenericRecord = {
    var input: InputStream = null
    var writer: DataFileWriter[GenericRecord] = null
    var encoder: Encoder = null
    var output: ByteArrayOutputStream = null
    try {
      val schema = new org.apache.avro.Schema.Parser().parse(avroSchema)
      val reader = new GenericDatumReader[GenericRecord](schema)
      input = new ByteArrayInputStream(json.getBytes())
      output = new ByteArrayOutputStream()
      val din = new DataInputStream(input)
      writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
      writer.create(schema, output)
      val decoder = DecoderFactory.get().jsonDecoder(schema, din)
      var datum: GenericRecord = null
      datum = reader.read(null, decoder)
      return datum
    } finally {
      input.close()
      writer.close()
    }
  }
  // scalastyle:on lineLength
}
